// SPDX-License-Identifier: GPL-3.0-or-later

#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"
#include "aclk_lws_https_client.h"
#include "aclk_common.h"

int aclk_shutting_down = 0;
// State-machine for the on-connect metadata transmission.
// TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
//       agent startup phase.
static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
static AGENT_STATE agent_state = AGENT_INITIALIZING;

// Other global state
static int aclk_subscribed = 0;
static int aclk_disable_single_updates = 0;
static time_t last_init_sequence = 0;
static int waiting_init = 1;
static char *aclk_username = NULL;
static char *aclk_password = NULL;

static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0;             // Exposed in the web-api

static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;

#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)

#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)

#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)

pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;

#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)

void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);

/*
 * Maintain a list of collectors and chart count
 * If all the charts of a collector are deleted
 * then a new metadata dataset must be send to the cloud
 *
 */
struct _collector {
    time_t created;
    uint32_t count; //chart count
    uint32_t hostname_hash;
    uint32_t plugin_hash;
    uint32_t module_hash;
    char *hostname;
    char *plugin_name;
    char *module_name;
    struct _collector *next;
};

struct _collector *collector_list = NULL;

struct aclk_query {
    time_t created;
    time_t run_after; // Delay run until after this time
    ACLK_CMD cmd;     // What command is this
    char *topic;      // Topic to respond to
    char *data;       // Internal data (NULL if request from the cloud)
    char *msg_id;     // msg_id generated by the cloud (NULL if internal)
    char *query;      // The actual query
    u_char deleted;   // Mark deleted for garbage collect
    struct aclk_query *next;
};

struct aclk_query_queue {
    struct aclk_query *aclk_query_head;
    struct aclk_query *aclk_query_tail;
    uint64_t count;
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };

char *create_uuid()
{
    uuid_t uuid;
    char *uuid_str = mallocz(36 + 1);

    uuid_generate(uuid);
    uuid_unparse(uuid, uuid_str);

    return uuid_str;
}

int cloud_to_agent_parse(JSON_ENTRY *e)
{
    struct aclk_request *data = e->callback_data;

    switch (e->type) {
        case JSON_OBJECT:
        case JSON_ARRAY:
            break;
        case JSON_STRING:
            if (!strcmp(e->name, "msg-id")) {
                data->msg_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "type")) {
                data->type_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "callback-topic")) {
                data->callback_topic = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "payload")) {
                if (likely(e->data.string)) {
                    size_t len = strlen(e->data.string);
                    data->payload = mallocz(len+1);
                    if (!url_decode_r(data->payload, e->data.string, len + 1))
                        strcpy(data->payload, e->data.string);
                }
                break;
            }
            break;
        case JSON_NUMBER:
            if (!strcmp(e->name, "version")) {
                data->version = atoi(e->original_string);
                break;
            }
            break;

        case JSON_BOOLEAN:
            break;

        case JSON_NULL:
            break;
    }
    return 0;
}


static RSA *aclk_private_key = NULL;
static int create_private_key()
{
    char filename[FILENAME_MAX + 1];
    snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir);

    long bytes_read;
    char *private_key = read_by_filename(filename, &bytes_read);
    if (!private_key) {
        error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
        return 1;
    }
    debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);

    BIO *key_bio = BIO_new_mem_buf(private_key, -1);
    if (key_bio==NULL) {
        error("Claimed agent cannot establish ACLK - failed to create BIO for key");
        goto biofailed;
    }

    aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
    BIO_free(key_bio);
    if (aclk_private_key!=NULL)
    {
        freez(private_key);
        return 0;
    }
    char err[512];
    ERR_error_string_n(ERR_get_error(), err, sizeof(err));
    error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);

biofailed:
    freez(private_key);
    return 1;
}

/*
 * After a connection failure -- delay in milliseconds
 * When a connection is established, the delay function
 * should be called with
 *
 * mode 0 to reset the delay
 * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
 *
 */
unsigned long int aclk_reconnect_delay(int mode)
{
    static int fail = -1;
    unsigned long int delay;

    if (!mode || fail == -1) {
        srandom(time(NULL));
        fail = mode - 1;
        return 0;
    }

    delay = (1 << fail);

    if (delay >= ACLK_MAX_BACKOFF_DELAY) {
        delay = ACLK_MAX_BACKOFF_DELAY * 1000;
    } else {
        fail++;
        delay = (delay * 1000) + (random() % 1000);
    }

    //    sleep_usec(USEC_PER_MS * delay);

    return delay;
}

/*
 * Free a query structure when done
 */

void aclk_query_free(struct aclk_query *this_query)
{
    if (unlikely(!this_query))
        return;

    freez(this_query->topic);
    if (likely(this_query->query))
        freez(this_query->query);
    if (likely(this_query->data))
        freez(this_query->data);
    if (likely(this_query->msg_id))
        freez(this_query->msg_id);
    freez(this_query);
}

// Returns the entry after which we need to create a new entry to run at the specified time
// If NULL is returned we need to add to HEAD
// Need to have a QUERY lock before calling this

struct aclk_query *aclk_query_find_position(time_t time_to_run)
{
    struct aclk_query *tmp_query, *last_query;

    // Quick check if we will add to the end
    if (likely(aclk_queue.aclk_query_tail)) {
        if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
            return aclk_queue.aclk_query_tail;
    }

    last_query = NULL;
    tmp_query = aclk_queue.aclk_query_head;

    while (tmp_query) {
        if (tmp_query->run_after > time_to_run)
            return last_query;
        last_query = tmp_query;
        tmp_query = tmp_query->next;
    }
    return last_query;
}

// Need to have a QUERY lock before calling this
struct aclk_query *
aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
{
    struct aclk_query *tmp_query, *prev_query;
    UNUSED(cmd);

    tmp_query = aclk_queue.aclk_query_head;
    prev_query = NULL;
    while (tmp_query) {
        if (likely(!tmp_query->deleted)) {
            if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
                if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
                    (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
                    if (likely(last_query))
                        *last_query = prev_query;
                    return tmp_query;
                }
            }
        }
        prev_query = tmp_query;
        tmp_query = tmp_query->next;
    }
    return NULL;
}

/*
 * Add a query to execute, the result will be send to the specified topic
 */

int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
    struct aclk_query *new_query, *tmp_query;

    // Ignore all commands while we wait for the agent to initialize
    if (unlikely(waiting_init))
        return 1;

    run_after = now_realtime_sec() + run_after;

    QUERY_LOCK;
    struct aclk_query *last_query = NULL;

    tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
    if (unlikely(tmp_query)) {
        if (tmp_query->run_after == run_after) {
            QUERY_UNLOCK;
            QUERY_THREAD_WAKEUP;
            return 1;
        }

        if (last_query)
            last_query->next = tmp_query->next;
        else
            aclk_queue.aclk_query_head = tmp_query->next;

        debug(D_ACLK, "Removing double entry");
        aclk_query_free(tmp_query);
        aclk_queue.count--;
    }

    new_query = callocz(1, sizeof(struct aclk_query));
    new_query->cmd = aclk_cmd;
    if (internal) {
        new_query->topic = strdupz(topic);
        if (likely(query))
            new_query->query = strdupz(query);
    } else {
        new_query->topic = topic;
        new_query->query = query;
        new_query->msg_id = msg_id;
    }

    if (data)
        new_query->data = strdupz(data);

    new_query->next = NULL;
    new_query->created = now_realtime_sec();
    new_query->run_after = run_after;

    debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");

    tmp_query = aclk_query_find_position(run_after);

    if (tmp_query) {
        new_query->next = tmp_query->next;
        tmp_query->next = new_query;
        if (tmp_query == aclk_queue.aclk_query_tail)
            aclk_queue.aclk_query_tail = new_query;
        aclk_queue.count++;
        QUERY_UNLOCK;
        QUERY_THREAD_WAKEUP;
        return 0;
    }

    new_query->next = aclk_queue.aclk_query_head;
    aclk_queue.aclk_query_head = new_query;
    aclk_queue.count++;

    QUERY_UNLOCK;
    QUERY_THREAD_WAKEUP;
    return 0;
}

inline int aclk_submit_request(struct aclk_request *request)
{
    return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
}

/*
 * Get the next query to process - NULL if nothing there
 * The caller needs to free memory by calling aclk_query_free()
 *
 *      topic
 *      query
 *      The structure itself
 *
 */
struct aclk_query *aclk_queue_pop()
{
    struct aclk_query *this_query;

    QUERY_LOCK;

    if (likely(!aclk_queue.aclk_query_head)) {
        QUERY_UNLOCK;
        return NULL;
    }

    this_query = aclk_queue.aclk_query_head;

    // Get rid of the deleted entries
    while (this_query && this_query->deleted) {
        aclk_queue.count--;

        aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;

        if (likely(!aclk_queue.aclk_query_head)) {
            aclk_queue.aclk_query_tail = NULL;
        }

        aclk_query_free(this_query);

        this_query = aclk_queue.aclk_query_head;
    }

    if (likely(!this_query)) {
        QUERY_UNLOCK;
        return NULL;
    }

    if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
        info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
        QUERY_UNLOCK;
        return NULL;
    }

    aclk_queue.count--;
    aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;

    if (likely(!aclk_queue.aclk_query_head)) {
        aclk_queue.aclk_query_tail = NULL;
    }

    QUERY_UNLOCK;
    return this_query;
}

// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g.  base_topic/subtopic
// This is called by aclk_init(), to compute the base topic once and have
// it stored internally.
// Need to check if additional logic should be added to make sure that there
// is enough information to determine the base topic at init time

char *create_publish_base_topic()
{
    if (unlikely(!is_agent_claimed()))
        return NULL;

    ACLK_LOCK;

    if (unlikely(!global_base_topic)) {
        char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;

        snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
        tmp = strchr(tmp_topic, '\n');
        if (unlikely(tmp))
            *tmp = '\0';
        global_base_topic = strdupz(tmp_topic);
    }

    ACLK_UNLOCK;
    return global_base_topic;
}

/*
 * Build a topic based on sub_topic and final_topic
 * if the sub topic starts with / assume that is an absolute topic
 *
 */

char *get_topic(char *sub_topic, char *final_topic, int max_size)
{
    int rc;

    if (likely(sub_topic && sub_topic[0] == '/'))
        return sub_topic;

    if (unlikely(!global_base_topic))
        return sub_topic;

    rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
    if (unlikely(rc >= max_size))
        debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);

    return final_topic;
}

/*
 * Free a collector structure
 */

static void _free_collector(struct _collector *collector)
{
    if (likely(collector->plugin_name))
        freez(collector->plugin_name);

    if (likely(collector->module_name))
        freez(collector->module_name);

    if (likely(collector->hostname))
        freez(collector->hostname);

    freez(collector);
}

/*
 * This will report the collector list
 *
 */
#ifdef ACLK_DEBUG
static void _dump_collector_list()
{
    struct _collector *tmp_collector;

    COLLECTOR_LOCK;

    info("DUMPING ALL COLLECTORS");

    if (unlikely(!collector_list || !collector_list->next)) {
        COLLECTOR_UNLOCK;
        info("DUMPING ALL COLLECTORS -- nothing found");
        return;
    }

    // Note that the first entry is "dummy"
    tmp_collector = collector_list->next;

    while (tmp_collector) {
        info(
            "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
            tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
            tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);

        tmp_collector = tmp_collector->next;
    }
    info("DUMPING ALL COLLECTORS DONE");
    COLLECTOR_UNLOCK;
}
#endif

/*
 * This will cleanup the collector list
 *
 */
static void _reset_collector_list()
{
    struct _collector *tmp_collector, *next_collector;

    COLLECTOR_LOCK;

    if (unlikely(!collector_list || !collector_list->next)) {
        COLLECTOR_UNLOCK;
        return;
    }

    // Note that the first entry is "dummy"
    tmp_collector = collector_list->next;
    collector_list->count = 0;
    collector_list->next = NULL;

    // We broke the link; we can unlock
    COLLECTOR_UNLOCK;

    while (tmp_collector) {
        next_collector = tmp_collector->next;
        _free_collector(tmp_collector);
        tmp_collector = next_collector;
    }
}

/*
 * Find a collector (if it exists)
 * Must lock before calling this
 * If last_collector is not null, it will return the previous collector in the linked
 * list (used in collector delete)
 */
static struct _collector *_find_collector(
    const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
    struct _collector *tmp_collector, *prev_collector;
    uint32_t plugin_hash;
    uint32_t module_hash;
    uint32_t hostname_hash;

    if (unlikely(!collector_list)) {
        collector_list = callocz(1, sizeof(struct _collector));
        return NULL;
    }

    if (unlikely(!collector_list->next))
        return NULL;

    plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
    module_hash = module_name ? simple_hash(module_name) : 1;
    hostname_hash = simple_hash(hostname);

    // Note that the first entry is "dummy"
    tmp_collector = collector_list->next;
    prev_collector = collector_list;
    while (tmp_collector) {
        if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
            hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
            (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
            (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
            if (unlikely(last_collector))
                *last_collector = prev_collector;

            return tmp_collector;
        }

        prev_collector = tmp_collector;
        tmp_collector = tmp_collector->next;
    }

    return tmp_collector;
}

/*
 * Called to delete a collector
 * It will reduce the count (chart_count) and will remove it
 * from the linked list if the count reaches zero
 * The structure will be returned to the caller to free
 * the resources
 *
 */
static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector, *prev_collector = NULL;

    tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);

    if (likely(tmp_collector)) {
        --tmp_collector->count;
        if (unlikely(!tmp_collector->count))
            prev_collector->next = tmp_collector->next;
    }
    return tmp_collector;
}

/*
 * Add a new collector (plugin / module) to the list
 * If it already exists just update the chart count
 *
 * Lock before calling
 */
static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector;

    tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);

    if (unlikely(!tmp_collector)) {
        tmp_collector = callocz(1, sizeof(struct _collector));
        tmp_collector->hostname_hash = simple_hash(hostname);
        tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
        tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;

        tmp_collector->hostname = strdupz(hostname);
        tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
        tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;

        tmp_collector->next = collector_list->next;
        collector_list->next = tmp_collector;
    }
    tmp_collector->count++;
    debug(
        D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
        module_name ? module_name : "*", tmp_collector->count);
    return tmp_collector;
}

/*
 * Add a new collector to the list
 * If it exists, update the chart count
 */
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector;

    COLLECTOR_LOCK;

    tmp_collector = _add_collector(hostname, plugin_name, module_name);

    if (unlikely(tmp_collector->count != 1)) {
        COLLECTOR_UNLOCK;
        return;
    }

    if (unlikely(agent_state == AGENT_INITIALIZING))
        last_init_sequence = now_realtime_sec();
    else {
        if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
            debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
    }

    COLLECTOR_UNLOCK;
}

/*
 * Delete a collector from the list
 * If the chart count reaches zero the collector will be removed
 * from the list by calling del_collector.
 *
 * This function will release the memory used and schedule
 * a cloud update
 */
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector;

    COLLECTOR_LOCK;

    tmp_collector = _del_collector(hostname, plugin_name, module_name);

    if (unlikely(!tmp_collector || tmp_collector->count)) {
        COLLECTOR_UNLOCK;
        return;
    }

    debug(
        D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
        tmp_collector->count);

    COLLECTOR_UNLOCK;

    if (unlikely(agent_state == AGENT_INITIALIZING))
        last_init_sequence = now_realtime_sec();
    else {
        if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
            debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
    }

    _free_collector(tmp_collector);
}

int aclk_execute_query(struct aclk_query *this_query)
{
    if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
        struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
        w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
        strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
        w->cookie1[0] = 0;      // Simulate web_client_create_on_fd()
        w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
        w->acl = 0x1f;

        char *mysep = strchr(this_query->query, '?');
        if (mysep) {
            strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
            *mysep = '\0';
        } else
            strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);

        mysep = strrchr(this_query->query, '/');

        // TODO: handle bad response perhaps in a different way. For now it does to the payload
        int rc = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
        BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
        buffer_flush(local_buffer);
        local_buffer->contenttype = CT_APPLICATION_JSON;

        aclk_create_header(local_buffer, "http", this_query->msg_id);

        char *encoded_response = aclk_encode_response(w->response.data);

        buffer_sprintf(
            local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\"\n}", rc, encoded_response);

        buffer_sprintf(local_buffer, "\n}");

        aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);

        buffer_free(w->response.data);
        freez(w);
        buffer_free(local_buffer);
        freez(encoded_response);
        return 0;
    }
    return 1;
}

/*
 * This function will fetch the next pending command and process it
 *
 */
int aclk_process_query()
{
    struct aclk_query *this_query;
    static long int query_count = 0;

    if (!aclk_connected)
        return 0;

    this_query = aclk_queue_pop();
    if (likely(!this_query)) {
        return 0;
    }

    if (unlikely(this_query->deleted)) {
        debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
        aclk_query_free(this_query);
        return 1;
    }
    query_count++;

    debug(
        D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
        this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));

    switch (this_query->cmd) {
        case ACLK_CMD_ONCONNECT:
            debug(D_ACLK, "EXECUTING on connect metadata command");
            aclk_send_metadata();
            aclk_metadata_submitted = ACLK_METADATA_SENT;
            break;

        case ACLK_CMD_CHART:
            debug(D_ACLK, "EXECUTING a chart update command");
            aclk_send_single_chart(this_query->data, this_query->query);
            break;

        case ACLK_CMD_CHARTDEL:
            debug(D_ACLK, "EXECUTING a chart delete command");
            //TODO: This send the info metadata for now
            aclk_send_info_metadata();
            break;

        case ACLK_CMD_ALARM:
            debug(D_ACLK, "EXECUTING an alarm update command");
            aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
            break;

        case ACLK_CMD_ALARMS:
            debug(D_ACLK, "EXECUTING an alarms update command");
            aclk_send_alarm_metadata();
            break;

        case ACLK_CMD_CLOUD:
            debug(D_ACLK, "EXECUTING a cloud command");
            aclk_execute_query(this_query);
            break;

        default:
            break;
    }
    debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);

    aclk_query_free(this_query);

    return 1;
}

/*
 * Process all pending queries
 * Return 0 if no queries were processed, 1 otherwise
 *
 */

int aclk_process_queries()
{
    if (unlikely(netdata_exit || !aclk_connected))
        return 0;

    if (likely(!aclk_queue.count))
        return 0;

    debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);

    //TODO: may consider possible throttling here
    while (aclk_process_query()) {
        // Process all commands
    };

    return 1;
}

static void aclk_query_thread_cleanup(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    COLLECTOR_LOCK;

    _reset_collector_list();
    freez(collector_list);

    COLLECTOR_UNLOCK;

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

/**
 * Main query processing thread
 *
 * On startup wait for the agent collectors to initialize
 * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
 * of no new collectors coming in in order to mark the agent
 * as stable (set agent_state = AGENT_STABLE)
 */
void *aclk_query_main_thread(void *ptr)
{
    netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);

    while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
        time_t checkpoint;

        checkpoint = now_realtime_sec() - last_init_sequence;
        if (checkpoint > ACLK_STABLE_TIMEOUT) {
            agent_state = AGENT_STABLE;
            info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
#ifdef ACLK_DEBUG
            _dump_collector_list();
#endif
            break;
        }
        info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
        sleep_usec(USEC_PER_SEC * 1);
    }

    while (!netdata_exit) {
        if (unlikely(!aclk_metadata_submitted)) {
            aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
            if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
                errno = 0;
                error("ACLK failed to queue on_connect command");
                aclk_metadata_submitted = 0;
            }
        }

        aclk_process_queries();

        QUERY_THREAD_LOCK;

        // TODO: Need to check if there are queries awaiting already
        if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
            sleep_usec(USEC_PER_SEC * 1);

        QUERY_THREAD_UNLOCK;

    } // forever
    info("Shutting down query processing thread");
    netdata_thread_cleanup_pop(1);
    return NULL;
}

// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
    char payload[512];
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    if (is_agent_claimed() && aclk_connected) {
        size_t write_q, write_q_bytes, read_q;
        time_t event_loop_timeout;

        // Wakeup thread to cleanup
        QUERY_THREAD_WAKEUP;
        // Send a graceful disconnect message
        char *msg_id = create_uuid();

        usec_t time_created_offset_usec = now_realtime_usec();
        time_t time_created = time_created_offset_usec / USEC_PER_SEC;
        time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;

        snprintfz(
            payload, 511,
            "{ \"type\": \"disconnect\","
            " \"msg-id\": \"%s\","
            " \"timestamp\": %ld,"
            " \"timestamp-offset-usec\": %llu,"
            " \"version\": %d,"
            " \"payload\": \"graceful\" }",
            msg_id, time_created, time_created_offset_usec, ACLK_VERSION);

        aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
        freez(msg_id);

        event_loop_timeout = now_realtime_sec() + 5;
        write_q = 1;
        while (write_q && event_loop_timeout > now_realtime_sec()) {
            _link_event_loop();
            lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
        }

        aclk_shutting_down = 1;
        _link_shutdown();
        aclk_lws_wss_mqtt_layer_disconect_notif();

        write_q = 1;
        event_loop_timeout = now_realtime_sec() + 5;
        while (write_q && event_loop_timeout > now_realtime_sec()) {
            _link_event_loop();
            lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
        }
    }

    info("Disconnected");

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

struct dictionary_singleton {
    char *key;
    char *result;
};

int json_extract_singleton(JSON_ENTRY *e)
{
    struct dictionary_singleton *data = e->callback_data;

    switch (e->type) {
        case JSON_OBJECT:
        case JSON_ARRAY:
            break;
        case JSON_STRING:
            if (!strcmp(e->name, data->key)) {
                data->result = strdupz(e->data.string);
                break;
            }
            break;
        case JSON_NUMBER:
        case JSON_BOOLEAN:
        case JSON_NULL:
            break;
    }
    return 0;
}

// Base-64 decoder.
// Note: This is non-validating, invalid input will be decoded without an error.
//       Challenges are packed into json strings so we don't skip newlines.
//       Size errors (i.e. invalid input size or insufficient output space) are caught.
size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
{
    static char lookup[256];
    static int first_time=1;
    if (first_time)
    {
        first_time = 0;
        for(int i=0; i<256; i++)
            lookup[i] = -1;
        for(int i='A'; i<='Z'; i++)
            lookup[i] = i-'A';
        for(int i='a'; i<='z'; i++)
            lookup[i] = i-'a' + 26;
        for(int i='0'; i<='9'; i++)
            lookup[i] = i-'0' + 52;
        lookup['+'] = 62;
        lookup['/'] = 63;
    }
    if ((input_size & 3) != 0)
    {
        error("Can't decode base-64 input length %zu", input_size);
        return 0;
    }
    size_t unpadded_size = (input_size/4) * 3;
    if ( unpadded_size > output_size )
    {
        error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
        return 0;
    }
    // Don't check padding within full quantums
    for (size_t i = 0 ; i < input_size-4 ; i+=4 )
    {
        uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
        output[0] = value >> 16;
        output[1] = value >> 8;
        output[2] = value;
        //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
        output += 3;
        input += 4;
    }
    // Handle padding only in last quantum
    if (input[2] == '=') {
        uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
        output[0] = value >> 4;
        //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
        return unpadded_size-2;
    }
    else if (input[3] == '=') {
        uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
        output[0] = value >> 10;
        output[1] = value >> 2;
        //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
        return unpadded_size-1;
    }
    else
    {
        uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
        output[0] = value >> 16;
        output[1] = value >> 8;
        output[2] = value;
        //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
        return unpadded_size;
    }
}

size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
{
    uint32_t value;
    static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
                           "abcdefghijklmnopqrstuvwxyz"
                           "0123456789+/";
    if ((input_size/3+1)*4 >= output_size)
    {
        error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
        return 0;
    }
    size_t count = 0;
    while (input_size>3)
    {
        value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
        output[0] = lookup[value >> 18];
        output[1] = lookup[(value >> 12) & 0x3f];
        output[2] = lookup[(value >> 6) & 0x3f];
        output[3] = lookup[value & 0x3f];
        //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
        output += 4;
        input += 3;
        input_size -= 3;
        count += 4;
    }
    switch (input_size)
    {
        case 2:
            value = (input[0] << 10) + (input[1] << 2);
            output[0] = lookup[(value >> 12) & 0x3f];
            output[1] = lookup[(value >> 6) & 0x3f];
            output[2] = lookup[value & 0x3f];
            output[3] = '=';
            //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); 
            count += 4;
            break;
        case 1:
            value = input[0] << 4;
            output[0] = lookup[(value >> 6) & 0x3f];
            output[1] = lookup[value & 0x3f];
            output[2] = '=';
            output[3] = '=';
            //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); 
            count += 4;
            break;
        case 0:
            break;
    }
    return count;
}



int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
{
    int  result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
    if (result == -1) {
        char err[512];
        ERR_error_string_n(ERR_get_error(), err, sizeof(err));
        error("Decryption of the challenge failed: %s", err);
    }
    return result;
}

char *extract_payload(BUFFER *b)
{
char *s = b->buffer;
unsigned int line_len=0;
    for (size_t i=0; i<b->len; i++)
    {
        if (*s == 0 )
            return NULL;
        if (*s == '\n' ) {
            if (line_len==0)
              return s+1;
            line_len = 0;
        }
        else if (*s == '\r') {
            /* don't count */
        }
        else
            line_len ++;
        s++;
    }
    return NULL;
}

void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
{
    char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    debug(D_ACLK, "Performing challenge-response sequence");
    if (aclk_password != NULL)
    {
        freez(aclk_password);
        aclk_password = NULL;
    }
    // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
    // TODO - target host?
    char *agent_id = is_agent_claimed();
    if (agent_id == NULL)
    {
        error("Agent was not claimed - cannot perform challenge/response");
        goto CLEANUP;
    }
    char url[1024];
    sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
    info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
    if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
    {
        error("Challenge failed: %s", data_buffer);
        goto CLEANUP;
    }
    struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };

    debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
    if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
    {
        freez(challenge.result);
        error("Could not parse the json response with the challenge: %s", data_buffer);
        goto CLEANUP;
    }
    if (challenge.result == NULL ) {
        error("Could not retrieve challenge from auth response: %s", data_buffer);
        goto CLEANUP;
    }


    size_t challenge_len = strlen(challenge.result);
    unsigned char decoded[512];
    size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));

    unsigned char plaintext[4096]={};
    int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
    freez(challenge.result);
    char encoded[512];
    size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
    encoded[encoded_len] = 0;
    debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);

    char response_json[4096]={};
    sprintf(response_json, "{\"response\":\"%s\"}", encoded);
    debug(D_ACLK, "Password phase: %s",response_json);
    // TODO - host
    sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
    if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
    {
        error("Challenge-response failed: %s", data_buffer);
        goto CLEANUP;
    }

    debug(D_ACLK, "Password response from cloud: %s", data_buffer);

    struct dictionary_singleton password = { .key = "password", .result = NULL };
    if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
    {
        freez(password.result);
        error("Could not parse the json response with the password: %s", data_buffer);
        goto CLEANUP;
    }

    if (password.result == NULL ) {
        error("Could not retrieve password from auth response");
        goto CLEANUP;
    }
    if (aclk_password != NULL )
        freez(aclk_password);
    if (aclk_username == NULL)
        aclk_username = strdupz(agent_id);
    aclk_password = password.result;

CLEANUP:
    freez(data_buffer);
    return;
}

static void aclk_try_to_connect(char *hostname, char *port, int port_num)
{
    info("Attempting to establish the agent cloud link");
    aclk_get_challenge(hostname, port);
    if (aclk_password == NULL)
        return;
    int rc;
    rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
    if (unlikely(rc)) {
        error("Failed to initialize the agent cloud link library");
    }
    aclk_connecting = 1;
}


/**
 * Main agent cloud link thread
 *
 * This thread will simply call the main event loop that handles
 * pending requests - both inbound and outbound
 *
 * @param ptr is a pointer to the netdata_static_thread structure.
 *
 * @return It always returns NULL
 */
void *aclk_main(void *ptr)
{
    struct netdata_static_thread *query_thread;

    netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
    if (!netdata_cloud_setting) {
        info("Killing ACLK thread -> cloud functionality has been disabled");
        return NULL;
    }

    info("Waiting for netdata to be ready");
    while (!netdata_ready) {
        sleep_usec(USEC_PER_MS * 300);
    }

    last_init_sequence = now_realtime_sec();
    query_thread = NULL;

    char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
    char *aclk_port = NULL;
    uint32_t port_num = 0;
    char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", DEFAULT_CLOUD_BASE_URL);
    if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
        error("Configuration error - cannot use agent cloud link");
        return NULL;
    }
    port_num = atoi(aclk_port);     // SSL library uses the string, MQTT uses the numeric value

    info("Waiting for netdata to be claimed");
    while(1) {
        while (likely(!is_agent_claimed())) {
            sleep_usec(USEC_PER_SEC * 5);
            if (netdata_exit)
                goto exited;
        }
        if (!create_private_key() && !_mqtt_lib_init())
            break;

        if (netdata_exit)
            goto exited;

        sleep_usec(USEC_PER_SEC * 60);
    }
    create_publish_base_topic();
    create_private_key();

    usec_t reconnect_expiry = 0; // In usecs

    while (!netdata_exit) {
        static int first_init = 0;
        size_t write_q, write_q_bytes, read_q;
        lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
        //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
        //   first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
        if (unlikely(!netdata_exit && !aclk_connected)) {
            if (unlikely(!first_init)) {
                aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
                first_init = 1;
            } else {
                if (aclk_connecting == 0) {
                    if (reconnect_expiry == 0) {
                        unsigned long int delay = aclk_reconnect_delay(1);
                        reconnect_expiry = now_realtime_usec() + delay * 1000;
                        info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
                    }
                    if (now_realtime_usec() >= reconnect_expiry) {
                        reconnect_expiry = 0;
                        aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
                    }
                    sleep_usec(USEC_PER_MS * 100);
                }
            }
            if (aclk_connecting) {
                _link_event_loop();
                sleep_usec(USEC_PER_MS * 100);
            }
            continue;
        }

        _link_event_loop();
        /*static int stress_counter = 0;
        if (write_q_bytes==0 && stress_counter ++ >5)
        {
            aclk_send_stress_test(8000000);
            stress_counter = 0;
        }*/

        // TODO: Move to on-connect
        if (unlikely(!aclk_subscribed)) {
            aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
        }

        if (unlikely(!query_thread)) {
            query_thread = callocz(1, sizeof(struct netdata_static_thread));
            query_thread->thread = mallocz(sizeof(netdata_thread_t));
            netdata_thread_create(
                query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
                query_thread);
        }
    } // forever
exited:
    aclk_shutdown();

    freez(aclk_username);
    freez(aclk_password);
    freez(aclk_hostname);
    freez(aclk_port);
    if (aclk_private_key != NULL)
        RSA_free(aclk_private_key);

    netdata_thread_cleanup_pop(1);
    return NULL;
}

/*
 * Send a message to the cloud, using a base topic and sib_topic
 * The final topic will be in the form <base_topic>/<sub_topic>
 * If base_topic is missing then the global_base_topic will be used (if available)
 *
 */
int aclk_send_message(char *sub_topic, char *message, char *msg_id)
{
    int rc;
    int mid;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    UNUSED(msg_id);

    if (!aclk_connected)
        return 0;

    if (unlikely(!message))
        return 0;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);

    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    ACLK_LOCK;
    rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
    // TODO: link the msg_id with the mid so we can trace it
    ACLK_UNLOCK;

    if (unlikely(rc)) {
        errno = 0;
        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}

/*
 * Subscribe to a topic in the cloud
 * The final subscription will be in the form
 * /agent/claim_id/<sub_topic>
 */
int aclk_subscribe(char *sub_topic, int qos)
{
    int rc;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    if (!aclk_connected) {
        error("Cannot subscribe to %s - not connected!", topic);
        return 1;
    }

    ACLK_LOCK;
    rc = _link_subscribe(final_topic, qos);
    ACLK_UNLOCK;

    // TODO: Add better handling -- error will flood the logfile here
    if (unlikely(rc)) {
        errno = 0;
        error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}

// This is called from a callback when the link goes up
void aclk_connect()
{
    info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
    aclk_connected = 1;
    waiting_init = 0;
    aclk_reconnect_delay(0);
    QUERY_THREAD_WAKEUP;
    return;
}

// This is called from a callback when the link goes down
void aclk_disconnect()
{
    if (likely(aclk_connected))
        info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
    aclk_subscribed = 0;
    aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
    waiting_init = 1;
    aclk_connected = 0;
    aclk_connecting = 0;
}

void aclk_shutdown()
{
    info("Shutdown initiated");
    aclk_connected = 0;
    _link_shutdown();
    info("Shutdown complete");
}

inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
{
    uuid_t uuid;
    char uuid_str[36 + 1];

    if (unlikely(!msg_id)) {
        uuid_generate(uuid);
        uuid_unparse(uuid, uuid_str);
        msg_id = uuid_str;
    }

    usec_t time_created_offset_usec = now_realtime_usec();
    time_t time_created = time_created_offset_usec / USEC_PER_SEC;
    time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;

    buffer_sprintf(
        dest,
        "\t{\"type\": \"%s\",\n"
        "\t\"msg-id\": \"%s\",\n"
        "\t\"timestamp\": %ld,\n"
        "\t\"timestamp-offset-usec\": %llu,\n"
        "\t\"version\": %d,\n"
        "\t\"payload\": ",
        type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION);

    debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
}

/*
 * Take a buffer, encode it and rewrite it
 *
 */

char *aclk_encode_response(BUFFER *contents)
{
    char *tmp_buffer = mallocz(contents->len * 2);
    char *src, *dst;
    size_t content_size = contents->len;

    src = contents->buffer;
    dst = tmp_buffer;
    while (content_size > 0) {
        switch (*src) {
            case '\n':
            case '\t':
                break;
            case 0x01 ... 0x08:
            case 0x0b ... 0x1F:
                *dst++ = '\\';
                *dst++ = '0';
                *dst++ = '0';
                *dst++ = (*src < 0x0F) ? '0' : '1';
                *dst++ = to_hex(*src);
                break;
            case '\"':
                *dst++ = '\\';
                *dst++ = *src;
                break;
            default:
                *dst++ = *src;
        }
        src++;
        content_size--;
    }
    *dst = '\0';

    return tmp_buffer;
}

/*
 * This will send alarm information which includes
 *    configured alarms
 *    alarm_log
 *    active alarms
 */
void aclk_send_alarm_metadata()
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    debug(D_ACLK, "Metadata alarms start");

    aclk_create_header(local_buffer, "connect_alarms", msg_id);

    buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
    health_alarms2json(localhost, local_buffer, 1);
    debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
    health_alarm_log2json(localhost, local_buffer, 0);
    debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
    health_alarms_values2json(localhost, local_buffer, 0);
    debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer, "\n}\n}");
    aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
}

/*
 * This will send the agent metadata
 *    /api/v1/info
 *    charts
 */
int aclk_send_info_metadata()
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    debug(D_ACLK, "Metadata /info start");

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    aclk_create_header(local_buffer, "connect", msg_id);
    buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
    web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
    debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
    charts2json(localhost, local_buffer, 1);
    buffer_sprintf(local_buffer, "\n}\n}");
    debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);

    aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

void aclk_send_stress_test(size_t size)
{
    char *buffer = mallocz(size);
    if (buffer != NULL)
    {
        for(size_t i=0; i<size; i++)
            buffer[i] = 'x';
        buffer[size-1] = 0;
        time_t time_created = now_realtime_sec();
        sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
        buffer[strlen(buffer)] = '"';
        buffer[size-2] = '}';
        buffer[size-3] = '"';
        aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
        error("Sending stress of size %zu at time %ld", size, time_created);
    }
    free(buffer);
}

// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{

    aclk_send_info_metadata();
    aclk_send_alarm_metadata();

    return 0;
}

void aclk_single_update_disable()
{
    aclk_disable_single_updates = 1;
}

void aclk_single_update_enable()
{
    aclk_disable_single_updates = 0;
}

// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
    if (unlikely(agent_state == AGENT_INITIALIZING))
        return;

    if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
        if (likely(aclk_connected)) {
            errno = 0;
            error("ACLK failed to queue on_connect command on alarm reload");
        }
    }
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)

int aclk_send_single_chart(char *hostname, char *chart)
{
    RRDHOST *target_host;

    target_host = rrdhost_find_by_hostname(hostname, 0);
    if (!target_host)
        return 1;

    RRDSET *st = rrdset_find(target_host, chart);
    if (!st)
        st = rrdset_find_byname(target_host, chart);
    if (!st) {
        info("FAILED to find chart %s", chart);
        return 1;
    }

    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    aclk_create_header(local_buffer, "chart", msg_id);
    rrdset2json(st, local_buffer, NULL, NULL, 1);
    buffer_sprintf(local_buffer, "\t\n}");

    aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
{
#ifndef ENABLE_ACLK
    UNUSED(host);
    UNUSED(chart_name);
    return 0;
#else
    if (!netdata_cloud_setting)
        return 0;

    if (host != localhost)
        return 0;

    if (unlikely(aclk_disable_single_updates))
        return 0;

    if (unlikely(agent_state == AGENT_INITIALIZING))
        last_init_sequence = now_realtime_sec();
    else {
        if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
            if (likely(aclk_connected)) {
                errno = 0;
                error("ACLK failed to queue chart_update command");
            }
        }
    }
    return 0;
#endif
}

int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
    BUFFER *local_buffer = NULL;

    if (host != localhost)
        return 0;

    if (unlikely(agent_state == AGENT_INITIALIZING))
        return 0;

    /*
     * Check if individual updates have been disabled
     * This will be the case when we do health reload
     * and all the alarms will be dropped and recreated.
     * At the end of the health reload the complete alarm metadata
     * info will be sent
     */
    if (unlikely(aclk_disable_single_updates))
        return 0;

    local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();

    buffer_flush(local_buffer);
    aclk_create_header(local_buffer, "status-change", msg_id);

    netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
    health_alarm_entry2json_nolock(local_buffer, ae, host);
    netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);

    buffer_sprintf(local_buffer, "\n}");

    if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
        if (likely(aclk_connected)) {
            errno = 0;
            error("ACLK failed to queue alarm_command on alarm_update");
        }
    }

    freez(msg_id);
    buffer_free(local_buffer);

    return 0;
}

/*
 * Parse the incoming payload and queue a command if valid
 */
int aclk_handle_cloud_request(char *payload)
{
    struct aclk_request cloud_to_agent = {
        .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
    };


    if (unlikely(agent_state == AGENT_INITIALIZING)) {
        debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
        return 0;
    }

    if (unlikely(!payload)) {
        debug(D_ACLK, "ACLK incoming message is empty");
        return 0;
    }

    debug(D_ACLK, "ACLK incoming message (%s)", payload);

    int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);

    if (unlikely(
            JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
            !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
            strcmp(cloud_to_agent.type_id, "http"))) {
        if (JSON_OK != rc)
            error("Malformed json request (%s)", payload);

        if (cloud_to_agent.version > ACLK_VERSION)
            error("Unsupported version in JSON request %d", cloud_to_agent.version);

        if (cloud_to_agent.payload)
            freez(cloud_to_agent.payload);

        if (cloud_to_agent.type_id)
            freez(cloud_to_agent.type_id);

        if (cloud_to_agent.msg_id)
            freez(cloud_to_agent.msg_id);

        if (cloud_to_agent.callback_topic)
            freez(cloud_to_agent.callback_topic);

        return 1;
    }

    if (unlikely(aclk_submit_request(&cloud_to_agent)))
        debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);

    // Note: the payload comes from the callback and it will be automatically freed
    return 0;
}
